
Tasks
In Spark, a Task (aka command) is the smallest individual unit of execution that corresponds to a RDD partition. Tasks are launched on executors.

In other (more technical) words, a task is a computation on a data partition in a stage of a RDD in a Spark job.
The Task
contract expects that custom tasks define runTask method.
runTask(context: TaskContext): T
Note
|
T is the type defined when a Task is created.
|
Name | Description |
---|---|
Used when ??? |
|
TaskMemoryManager that manages the memory allocated by the task. Used when ??? |
|
Used when ??? |
|
Used when ??? |
|
Used when ??? |
|
Used when ??? |
|
Used when ??? |
|
Set for a |
A task can only belong to one stage and operate on a single partition. All tasks in a stage must be completed before the stages that follow can start.
Tasks are spawned one by one for each stage and partition.
Caution
|
FIXME What are stageAttemptId and taskAttemptId ?
|
A task in Spark is represented by the Task
abstract class with two concrete implementations:
-
ShuffleMapTask that executes a task and divides the task’s output to multiple buckets (based on the task’s partitioner).
-
ResultTask that executes a task and sends the task’s output back to the driver application.
The very last stage in a Spark job consists of multiple ResultTasks, while earlier stages can only be ShuffleMapTasks.
Caution
|
FIXME You could have a Spark job with ShuffleMapTask being the last. |
Creating Task
Instance
Task[T](
val stageId: Int,
val stageAttemptId: Int,
val partitionId: Int,
var localProperties: Properties = new Properties,
serializedTaskMetrics: Array[Byte] =
SparkEnv.get.closureSerializer.newInstance().serialize(TaskMetrics.registered).array(),
val jobId: Option[Int] = None,
val appId: Option[String] = None,
val appAttemptId: Option[String] = None)
extends Serializable
Task Attributes
A Task
instance is uniquely identified by the following task attributes:
-
stageId
- there can be many stages in a job. Every stage has its own uniquestageId
that the task belongs to. -
stageAttemptId
- a stage can be re-attempted for execution in case of failure.stageAttemptId
represents the attempt id of a stage that the task belongs to. -
partitionId
- a task is a unit of work on a partitioned distributed dataset. Every partition has its own uniquepartitionId
that a task processes. -
metrics
- an instance of TaskMetrics for the task. -
localProperties
- local private properties of the task.
Running Task Thread — run
Method
run(
taskAttemptId: Long,
attemptNumber: Int,
metricsSystem: MetricsSystem): T
run
registers task attempt id to the executor’s BlockManager and creates a TaskContextImpl
that in turn gets set as the thread local TaskContext.
If the task has been killed before the task runs it is killed (with interruptThread
flag disabled).
The task runs.
Caution
|
FIXME Describe catch and finally blocks.
|
Note
|
When run is called from TaskRunner.run, the Task has just been deserialized from taskBytes that were sent over the wire to an executor. localProperties and TaskMemoryManager are already assigned.
|
Task States
A task can be in one of the following states:
-
LAUNCHING
-
RUNNING
when the task is being started. -
FINISHED
when the task finished with the serialized result. -
FAILED
when the task fails, e.g. whenFetchFailedException
(see FetchFailedException),CommitDeniedException
or anyThrowable
occur -
KILLED
when an executor kills a task. -
LOST
States are the values of org.apache.spark.TaskState
.
Note
|
Task status updates are sent from executors to the driver through ExecutorBackend. |
Task is finished when it is in one of FINISHED
, FAILED
, KILLED
, LOST
LOST
and FAILED
states are considered failures.
Tip
|
Task states correspond to org.apache.mesos.Protos.TaskState. |
Collect Latest Values of Accumulators — collectAccumulatorUpdates
Method
collectAccumulatorUpdates(taskFailed: Boolean = false): Seq[AccumulableInfo]
collectAccumulatorUpdates
collects the latest values of accumulators used in a task (and returns the values as a collection of AccumulableInfo).
Note
|
It is used in TaskRunner to send a task’s final results with the latest values of accumulators used. |
When taskFailed
is true
it filters out accumulators with countFailedValues
disabled.
Caution
|
FIXME Why is the check context != null ?
|
Note
|
It uses context.taskMetrics.accumulatorUpdates() .
|
Caution
|
FIXME What is context.taskMetrics.accumulatorUpdates() doing?
|
Killing Task — kill
Method
kill(interruptThread: Boolean)
kill
marks the task to be killed, i.e. it sets the internal _killed
flag to true
.
It calls TaskContextImpl.markInterrupted when context
is set.
If interruptThread
is enabled and the internal taskThread
is available, kill
interrupts it.
Caution
|
FIXME When could context and interruptThread not be set?
|